package com.parkingwang.learning.rx2;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class SchedulerDemo {

    public static void main(String[] args) {

        Observable.just("1")
                .subscribeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println("map1-----" + Thread.currentThread().getName());//io
                        return s;
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println("map2-----" + Thread.currentThread().getName());//io
                        return s;
                    }
                })
                .subscribeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println("map3-----" + Thread.currentThread().getName());//io
                        return s;
                    }
                })
                //没有遇到observeOn之前都是上游数据，即线程切换取决于第一个subscribeOn的线程
                //遇到observeOn之前都是下游数据，每切换一次observeOn线程就改变一次下游数据的线程
                //observeOn如果写在subscribeOn之前，那么subscribeOn切换的线程将没有意义，observeOn的线程优先级高，之后的线程切换由observeOn的线程所决定，与subscribeOn无关了
                .observeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        System.out.println("map4-----" + Thread.currentThread().getName());//newThread
                        return s;
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe-----" + Thread.currentThread().getName());//main
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext-----" + Thread.currentThread().getName());//io
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
